{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Kinesis Firehose\n",
    "\n",
    "![Kinesis Firehose](img/kinesis_firehose_s3_docs.png)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import boto3\n",
    "import sagemaker\n",
    "import pandas as pd\n",
    "\n",
    "sess   = sagemaker.Session()\n",
    "bucket = sess.default_bucket()\n",
    "role = sagemaker.get_execution_role()\n",
    "region = boto3.Session().region_name\n",
    "\n",
    "sm = boto3.Session().client(service_name='sagemaker', region_name=region)\n",
    "kinesis = boto3.Session().client(service_name='kinesis', region_name=region)\n",
    "firehose = boto3.Session().client(service_name='firehose', region_name=region)\n",
    "sts = boto3.Session().client(service_name='sts', region_name=region)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Download Dataset"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!aws s3 cp 's3://amazon-reviews-pds/tsv/amazon_reviews_us_Digital_Software_v1_00.tsv.gz' ./data/"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import csv\n",
    "import pandas as pd\n",
    "\n",
    "df = pd.read_csv('./data/amazon_reviews_us_Digital_Software_v1_00.tsv.gz', \n",
    "                 delimiter='\\t', \n",
    "                 quoting=csv.QUOTE_NONE,\n",
    "                 compression='gzip')\n",
    "df.shape"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df.head(5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df_star_rating_and_review_body = df[['star_rating', 'review_body']][:100]\n",
    "df_star_rating_and_review_body.shape"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df_star_rating_and_review_body.head()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "reviews_tsv = df_star_rating_and_review_body.to_csv(sep='\\t',\n",
    "                                                    header=None,\n",
    "                                                    index=False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": false
   },
   "outputs": [],
   "source": [
    "reviews_tsv"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%store -r stream_name"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(stream_name)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Create Firehose Delivery Stream to S3"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%store -r partition_key"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(partition_key)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%store -r data_stream_arn"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(data_stream_arn)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%store -r iam_role_kinesis_arn"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(iam_role_kinesis_arn)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "account_id = sts.get_caller_identity()['Account']"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "firehose_name = 'dsoaws-firehose-stream'\n",
    "s3_bucket_arn = 'arn:aws:s3:::{}'.format(bucket)\n",
    "s3_prefix = 'kinesis-firehose'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": false
   },
   "outputs": [],
   "source": [
    "firehose_response = firehose.create_delivery_stream(\n",
    "    DeliveryStreamName=firehose_name,\n",
    "    DeliveryStreamType='KinesisStreamAsSource',\n",
    "    KinesisStreamSourceConfiguration={\n",
    "        'KinesisStreamARN': data_stream_arn,\n",
    "        'RoleARN': iam_role_kinesis_arn\n",
    "    },\n",
    "    ExtendedS3DestinationConfiguration={\n",
    "        'RoleARN': iam_role_kinesis_arn,\n",
    "        'BucketARN': s3_bucket_arn,\n",
    "        'Prefix': s3_prefix,\n",
    "        'BufferingHints': {\n",
    "            'SizeInMBs': 1,\n",
    "            'IntervalInSeconds': 60\n",
    "        }        \n",
    "    }\n",
    ")\n",
    "\n",
    "print(firehose_response)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "firehouse_delivery_stream_arn = firehose_response['DeliveryStreamARN']\n",
    "print(firehouse_delivery_stream_arn)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 28,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Stored 'firehouse_delivery_stream_arn' (str)\n"
     ]
    }
   ],
   "source": [
    "%store firehouse_delivery_stream_arn"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from IPython.core.display import display, HTML\n",
    "\n",
    "display(HTML('<b>Review <a href=\"https://console.aws.amazon.com/kinesis/home?region={}#/streams/details/{}/monitoring\"> Stream</a></b>'.format(region, stream_name)))\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from IPython.core.display import display, HTML\n",
    "\n",
    "display(HTML('<b>Review <a href=\"https://console.aws.amazon.com/firehose/home?region={}#/details/{}/monitoring\"> Firehose</a></b>'.format(region, firehose_name)))\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from IPython.core.display import display, HTML\n",
    "\n",
    "display(HTML('<b>Review <a href=\"https://console.aws.amazon.com/s3/buckets/{}/{}?region={}\">Data in S3</a></b>'.format(bucket, s3_prefix, region)))\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Simulate an Application and Write to Data Stream\n",
    "When you configure a a Kinesis Firehose to use Kinesis Data Streams as the source, you must use the Kinesis Data Streams `PutRecord` and `PutRecords` operations to add data to the Kinesis Data Firehose.  These are the same APIs that we used in the Kinesis Data Streams notebook."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "data_stream = boto3.Session().client(service_name='kinesis', region_name=region)\n",
    "\n",
    "response = data_stream.put_records(\n",
    "    Records=[\n",
    "        {\n",
    "            'Data': reviews_tsv.encode('utf-8'),\n",
    "            'PartitionKey': partition_key\n",
    "        },\n",
    "    ],\n",
    "    StreamName=stream_name\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "conda_python3",
   "language": "python",
   "name": "conda_python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.5"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
